package org.example.async;

import java.util.*;
import java.util.concurrent.*;

public class ConcurrentOps {
    /*---------------- How to use ----------------*/
    private static void tryExchange() {
        final Exchanger<Set<String>> exchanger = new Exchanger<>();
        new Consumer(exchanger).start();
        new Producer(exchanger).start();
    }

    private static void trySemaphore() {
        Semaphore sema = new Semaphore(3);
        for (int i = 0; i < 100; i++) {
            new Thread(new Seph(sema)).start();
        }
    }

    private static void tryCyclicBarrier() {
        long start = System.currentTimeMillis();
        CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                System.out.println("all thread end");
            }
        });
        new Thread(new Barrier(1, barrier)).start();
        new Thread(new Barrier(2, barrier)).start();
        System.out.println("barrier total=" + (System.currentTimeMillis() - start) + "ms");
    }

    private static void tryFuture() {
        long start = System.currentTimeMillis();
        ExecutorService pool = Executors.newFixedThreadPool(100);
        // 分组
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < 10; ++i) {
            futures.add(pool.submit(new Worker()));
        }
        // 回收, 性能好碰见超时的会丢弃, 不过有时候也刹不住扯车。
        futures.stream().forEach(val -> {
            try {
                System.out.println(val.get(10, TimeUnit.MILLISECONDS));
            } catch (InterruptedException | ExecutionException | TimeoutException e) {

            }
        });
        pool.shutdown();
        System.out.println("future total=" + (System.currentTimeMillis() - start) + "ms");
    }

    private static void tryCountDownLatch() throws InterruptedException {
        long start = System.currentTimeMillis();
        ExecutorService pool = Executors.newFixedThreadPool(100);
        // 要10个肯定能返回10个, 会等结束完了再返回, 设置超时没啥用, 同样场景比原始future多100ms开销
        final CountDownLatch cdl = new CountDownLatch(10);
        List<Future<String>> tasks = new ArrayList<>();
        long counter = cdl.getCount();
        for (int i = 0; i < counter; i++) {
            tasks.add(pool.submit(new Latcher(cdl)));
        }

        cdl.await();
        tasks.stream().forEach(val -> {
            try {
                System.out.println(val.get(1, TimeUnit.MILLISECONDS));
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new RuntimeException(e);
            }
        });
        System.out.println("latch total=" + (System.currentTimeMillis() - start) + "ms");
        pool.shutdown();
    }

    private static void runnableAndFuture() throws ExecutionException, InterruptedException {
        ExecutorService execs = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
        Callable<String> c1 = new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "foobar";
            }
        };
        FutureTask<String> ft = new FutureTask<>(c1);
        Thread t3 = new Thread(ft);
        t3.start();
        if (!ft.isDone()) {
            System.out.println("unfinished");
            System.out.printf("[%s] is running%n", Thread.currentThread().getName());
            T1 t1 = new T1();
            execs.execute(t1);
            Runnable t2 = () -> System.out.printf("[%s] is running%n", Thread.currentThread().getName());
            execs.execute(t2);
        }

        System.out.println(ft.get());
        execs.shutdown();
    }
}
/*---------------- How to use ----------------*/

class T1 extends Thread {
    @Override
    public void run() {
        System.out.printf("[%s] is running%n", getName());
    }
}

class Latcher implements Callable<String> {
    private CountDownLatch latch;

    public Latcher(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public String call() throws Exception {
        int serial = (int) (Math.random() * 100);
        Thread.sleep(serial);
        latch.countDown();

        return String.format("latch return cost=%sms", serial);
    }
}

class Worker implements Callable<String> {

    @Override
    public String call() throws Exception {
        int serial = (int) (Math.random() * 100);
        Thread.sleep(serial);

        return String.format("future return cost=%sms", serial);
    }
}

class Barrier implements Runnable {
    private CyclicBarrier barr;
    private Integer serial;

    public Barrier(Integer serial, CyclicBarrier barr) {
        this.barr = barr;
        this.serial = serial;
    }

    @Override
    public void run() {
        try {
            int sleep = (int) (Math.random() * 100);
            System.out.println("sleep for " + sleep + "ms");
            Thread.sleep(sleep);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.printf("Runnable-%s start%n", this.serial);
        try {
            barr.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
        System.out.printf("Runnable-%s end%n", this.serial);
    }
}

class Seph implements Runnable {
    static final List<String> resources = Arrays.asList("signal-0", "signal-1", "signal-2");
    Semaphore remain;

    public Seph(Semaphore remain) {
        this.remain = remain;
    }

    String doSomething() throws InterruptedException {
        long start = System.currentTimeMillis();
        remain.acquire();
        System.out.printf("Thread %d take %d ms to get resource, remain permits:%s\n", Thread.currentThread().getId(), System.currentTimeMillis() - start, remain.availablePermits());
        String result = "";
        synchronized (resources) {
            result = resources.get((int) (Math.random() * 3));
            long duration = (long) (Math.random() * 100);
            System.out.printf("Thread %d aquire resources for %sms%n", Thread.currentThread().getId(), duration);
            Thread.sleep(duration);
        }

        remain.release();
        System.out.printf("Thread %d release resource, remain permits:%s\n", Thread.currentThread().getId(), remain.availablePermits());

        return result;
    }

    @Override
    public void run() {
        try {
            doSomething();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer extends Thread {
    Exchanger<Set<String>> exchanger;

    public Consumer(Exchanger<Set<String>> exchanger) {
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        Set<String> set = new HashSet<>();
        while (!isInterrupted()) {
            for (int i = 0; i < 10; i++) {
                set.add(Integer.toString(i));
            }
            try {
                Thread.sleep(1000);
                System.out.printf("%d products produced\n", set.size());
                set = exchanger.exchange(set);
                System.out.printf("get empty set from consumer, size is %d\n", set.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Producer extends Thread {
    Exchanger<Set<String>> exchanger;

    public Producer(Exchanger<Set<String>> exchanger) {
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        Set<String> set = new HashSet<>();
        while (!isInterrupted()) {
            try {
                if (!set.isEmpty()) {
                    set.clear();
                    System.out.printf("consumer over, size is %d\n", set.size());
                }
                set = exchanger.exchange(set);
                System.out.printf("get products from producer, size is %d\n", set.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
